package fun.easycode.datastream;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
 * Canal binlog解析器
 * 1. 从canal server获取binlog数据
 * 2. 解析binlog数据
 * 3. 转换RowData
 * 4. 使用转换后的数据匹配IDataProcessor和IDataTransformer进行处理
 *
 * @author xuzhen97
 */
@Slf4j
public class CanalAdapter implements Runnable {

    private final CanalConnector connector;
    private final Map<String, List<CanalListener>> tableNameListeners = new HashMap<>();
    private final CanalRowDataParser parser;
    private final DataStreamProperties properties;
    private boolean running = true;

    public CanalAdapter(CanalConnector connector, CanalRowDataParser parser, DataStreamProperties properties) {
        this.connector = connector;
        this.parser = parser;
        this.properties = properties;
    }

    /**
     * 启动canal manager，持续从canal server获取binlog数据
     * Context 上下文管理
     */
    public void start() {
        this.running = true;
        CompletableFuture.runAsync(this);
        log.info("canal adapter started");
    }

    /**
     * 停止canal manager
     * Context 上下文管理
     */
    public void stop() {
        this.running = false;
        log.info("canal adapter stopped");
    }

    /**
     * 注册监听器
     *
     * @param listener 监听器
     */
    public synchronized void registerListener(CanalListener listener) {
        tableNameListeners.computeIfAbsent(listener.getTableName(), k -> new ArrayList<>())
                .add(listener);
    }

    @Override
    public void run() {
        int batchSize = 1000;
        try {
            connector.connect();
            connector.subscribe(properties.getCanal().getSubscribe());
            connector.rollback();
            while (running) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(batchSize);
                long batchId = message.getId();
                try {
                    // 解析数据
                    processEntries(message.getEntries());
                    // 提交确认
                    connector.ack(batchId);
                } catch (Exception e) {
                    // 处理失败, 回滚数据
                    connector.rollback(batchId);
                    log.error("处理数据出现异常", e);
                }
            }
        } finally {
            connector.disconnect();
        }
    }

    /**
     * 处理binlog数据
     *
     * @param entries binlog数据
     */
    private void processEntries(List<Entry> entries) {
        for (Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry, e);
            }

            //判断是否是DDL语句
            if (rowChage.getIsDdl()) {
                log.warn("DDL语句:{}", rowChage.getSql());
                continue;
            }

            CanalEntry.EventType eventType = rowChage.getEventType();

            String tableName = entry.getHeader().getTableName();

            // 如果没有监听器，跳过
            if (!tableNameListeners.containsKey(tableName)) {
                log.warn("没有监听器: {}", tableName);
                continue;
            }

            for (CanalListener listener : tableNameListeners.get(tableName)) {
                for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                    DataEntry dataEntry = parser.parse(rowData, eventType, listener.getTargetClass());

                    listener.onMessage(dataEntry);
                    // 打印数据收集日志方错误排查
                    printLog(tableName, eventType, rowData);
                }
            }
        }
    }

    /**
     * 打印日志
     *
     * @param tableName 表名
     * @param eventType 事件类型
     * @param rowData   行数据
     */
    private void printLog(String tableName, CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        if (eventType == CanalEntry.EventType.DELETE) {
            log.info("tableName: {} , eventType: {} , sqlData: {}"
                    , tableName, eventType, getSqlData(rowData.getBeforeColumnsList()));
        } else if (eventType == CanalEntry.EventType.INSERT) {
            log.info("tableName: {} , eventType: {} , sqlData: {}"
                    , tableName, eventType, getSqlData(rowData.getAfterColumnsList()));
        } else {
            String beforeSqlData = getSqlData(rowData.getBeforeColumnsList());
            String afterSqlData = getSqlData(rowData.getAfterColumnsList());

            log.info("tableName: {}, eventType: {} , beforeSqlData: {} , afterSqlData: {}"
                    , tableName, eventType, beforeSqlData, afterSqlData);
        }

    }

    /**
     * 获取sql数据
     *
     * @param columns 列数据
     * @return sql数据
     */
    private String getSqlData(List<CanalEntry.Column> columns) {
        StringBuilder sb = new StringBuilder("[ ");
        for (CanalEntry.Column column : columns) {
            sb.append(column.getName()).append(" = ")
                    .append(column.getValue()).append(" : update = ")
                    .append(column.getUpdated()).append(" , ");
        }
        sb.append("]");
        return sb.toString();
    }
}
